-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer #17700
KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer #17700
Conversation
… the new consumer Updated the FetchRequestManager to only create and enqueue fetch requests when signaled to do so by a FetchEvent.
…om prepareFetchRequests()
Fixed typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue : Thanks the updated PR. The code LGTM. Are the test failures related?
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
Outdated
Show resolved
Hide resolved
I don't believe they are, no. I'll look at the failures from the current test run and dig around a little to see if others are hitting them too and report back. Thanks. |
@junrao—the majority of the errors I see in the latest test run are not related. The following test failure occurs on both Java 17 and 23, but the issue has been filed several times:
The following tests are flaky, and have issues filed:
The only issue that isn't filed is this:
I'll see if I can reproduce that flaky test locally. |
@junrao—I wasn't able to reproduce the flaky behavior in |
@junrao—the latest test run has a few flaky tests, but they're all known flaky tests that are filed in Jira. Are we able to merge this change, or should we wait for green build? Thanks! |
@kirktrue the fix for the failed test here has been merged to trunk and builds are green again. Get the latest changes and we should be green here too |
@junrao @lianetm @jeffkbkim—all green! Can we merge? 🥺 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue : Thanks for triaging the tests. LGTM
🥳 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too, thanks @kirktrue !
@kirktrue : Do we want to create a separate PR to cherry-pick this to 4.0? |
Yes. Is that step performed by the merge-r or the contributor? Sometimes the person merging to Thanks |
…ncKafkaConsumer (#17700) This change reduces fetch session cache evictions on the broker for AsyncKafkaConsumer by altering its logic to determine which partitions it includes in fetch requests. Background Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request. The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll(). On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache. This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen. Options The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are: The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior) The background thread should skip the fetch request generation entirely if there are any buffered partitions The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value The background thread should skip fetching from the nodes that have buffered partitions Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be. Reviewers: Lianet Magrans <[email protected]>, Jeff Kim <[email protected]>, Jun Rao <[email protected]>
Thanks for pushing this through @kirktrue! |
Hey folks, I'm tracing the failure of https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&tests.container=org.apache.kafka.streams.integration.StandbyTaskEOSMultiRebalanceIntegrationTest&tests.test=shouldHonorEOSWhenUsingCachingAndStandbyReplicas() back to this PR. Can we look into this? It failed only for this PR on Jan 27, 28, 29 and then once it was merged, we see the failure a lot on trunk. |
…with AsyncKafkaConsumer (apache#17700)" This reverts commit 9c02072.
This change reduces fetch session cache evictions on the broker for
AsyncKafkaConsumer
by altering its logic to determine which partitions it includes in fetch requests.Background
Consumer
implementations fetch data from the cluster and temporarily buffer it in memory until the user next callsConsumer.poll()
. When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request.The
ClassicKafkaConsumer
performs much of its fetch logic and network I/O in the application thread. Onpoll()
, if there is any locally-buffered data, theClassicKafkaConsumer
does not fetch any new data and simply returns the buffered data to the user frompoll()
.On the other hand, the
AsyncKafkaConsumer
consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. TheAsyncKafkaConsumer
also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache.This issue is technically possible in the
ClassicKafkaConsumer
too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which theAsyncKafkaConsumer
's background thread runs, it is ~100x more likely to happen.Options
The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the
AsyncKafkaConsumer
. Among them are:Option 4 won out. The change is localized to
AbstractFetch
where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be.Testing
Eviction rate testing
Here are the results of our internal stress testing:
ClassicKafkaConsumer
—after the initial spike during test start up, the average rate settles down to ~0.14 evictions/secondAsyncKafkaConsumer
, (w/o fix)—after startup, the evictions still settle down, but they are about 100x higher than theClassicKafkaConsumer
at ~1.48 evictions/secondAsyncKafkaConsumer
(w/ fix)—the eviction rate is now closer to theClassicKafkaConsumer
at ~0.22 evictions/secondEndToEndLatency
testingThe bundled
EndToEndLatency
test runner was executed on a single machine using Docker. Theapache/kafka:latest
Docker image was used and either thecluster/combined/plaintext/docker-compose.yml
orsingle-node/plaintext/docker-compose.yml
Docker Compose configuration files, depending on the test. The Docker containers were recreated from scratch before each test.A single topic was created with 30 partitions and with a replication factor of either 1 or 3, depending on a single- or multi-node setup.
For each of the test runs these argument values were used:
acks
: 1A configuration file which contained a single configuration value of
group.protocol=<$group_protocol>
was also provided to the test, where$group_protocol
was eitherCLASSIC
orCONSUMER
.Test results
Test 1—
CLASSIC
group protocol, cluster size: 3 nodes, replication factor: 3trunk
Test 2—
CONSUMER
group protocol, cluster size: 3 nodes, replication factor: 3trunk
Test 3—
CLASSIC
group protocol, cluster size: 1 node, replication factor: 1trunk
Test 4—
CONSUMER
group protocol, cluster size: 1 node, replication factor: 1trunk
Conclusion
These tests did not reveal any significant differences between the current fetcher logic on
trunk
and the one proposed in this PR. Addition test runs using larger message counts and/or larger message sizes did not affect the result.